-
Notifications
You must be signed in to change notification settings - Fork 111
Use ingestion-client in the Shuffler
#4024
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Shuffler
8008c9c to
91f6046
Compare
tillrohrmann
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot for replacing the direct Bifrost write with the IngressClient in the Shuffle @muhamadazmy. Maybe the name IngressClient does not fit 100% given that now also the Shuffle uses it. Maybe something like IngestionClient or so works better. Given that we don't use the send window of the IngressClient yet, I wouldn't expect a different runtime behavior of the shuffle. Once we have this, I would be interested in how the overall shuffle throughput increases by using the IngressClient.
I left a few minor comments for your consideration.
| ingress | ||
| .ingest( | ||
| msg.partition_key(), | ||
| IngestRecord::from_parts(msg.record_keys(), msg), | ||
| ) | ||
| .await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As a follow-up to this PR we should make use of being able to send more than a single record at a time to maximize our throughput. I guess this will require a overhaul of the Shuffle component. I think quite a few things can be simplified here (no more pin projecting if we don't require shuffle_next_message to run in the select arm, etc.).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I totally agree.
crates/worker/src/lib.rs
Outdated
| networking.clone(), | ||
| Metadata::with_current(|m| m.updateable_partition_table()), | ||
| partition_routing.clone(), | ||
| NonZeroUsize::new(5 * 1024 * 1024).unwrap(), // 5MB |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How to best size the buffer window to fully utilize the network connections between the partition processors? Should this be something along the lines of RTT * bandwidth * #nodes * 2 to be able to keep all connections fully utilized?
Probably also a good idea to make this configurable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's the buffer for the inflight records which is shared across partition sessions. It eventually should be different from the partition session chunk size (work in progress), which is the one that has to respect max network request size
|
There seem to be a few test failures on GHA. |
| ingress | ||
| .ingest( | ||
| msg.partition_key(), | ||
| IngestRecord::from_parts(msg.record_keys(), msg), | ||
| ) | ||
| .await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about support for rolling upgrades?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ingest does not fail unless the ingession client is closed. This means worst case is that it will block until leaders are responsive.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think to avoid this situation it's possible we release support for the Ingest messages in PP first before actually using them in the following release.
d6e9955 to
8c0797e
Compare
d14fb10 to
e1b4c8e
Compare
ShufflerShuffler
f03e3c2 to
078d17d
Compare
- `ingestion-client` implements the runtime layer that receives WAL envelopes, fans it out to the correct partition, and tracks completion. It exposes: - `IngestionClient`, enforces inflight budgets, and resolves partition IDs before sending work downstream. - The session subsystem that batches `IngestRecords`, retries connections, and reports commit status to callers. - `ingestion-client` only ingests records and notify the caller once the record is "committed" to bifrost by the PP. This makes it useful to implement kafka ingress and other external ingestion
Summary: Handle the incoming `IngestRequest` messages sent by the `ingestion-client`
Summary: Refactor ingress-kafka to leverage on `ingestion-client` implementation. This replaces the previous direct write to bifrost which allows: - Batching, which increases throughput - PP becomes the sole writer of its logs (WIP restatedev#3965)
- Use IngestionClient instead of bifrost to write to partitions logs - Remove deprecated `delete_invocation`
Summary: This PR makes sure cleaner does not do an external bifrost write by using creating a cleaner effect stream that can be handled directly by the PP event loop
Avoid direct writes to bifrost in shuffler by using a dedicated ingestion-client instance.
Use ingestion-client in the
ShufflerAvoid direct writes to bifrost in shuffler by using a
dedicated ingestion-client instance.
Stack created with Sapling. Best reviewed with ReviewStack.
Shuffler#4024IngestionClientfor invocation and state mgmt #3980ingestion-client#3975IngestRequestmessage #3974CommitTokenback from notify_committed() #3968ingestion-clientcrate #3976